﻿/**

 * Copyright (c) 2015-2016, FastDev 刘强 (fastdev@163.com) & Quincy.

 *

 * Licensed under the Apache License, Version 2.0 (the "License");

 * you may not use this file except in compliance with the License.

 * You may obtain a copy of the License at

 *

 *      http://www.apache.org/licenses/LICENSE-2.0

 *

 * Unless required by applicable law or agreed to in writing, software

 * distributed under the License is distributed on an "AS IS" BASIS,

 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

 * See the License for the specific language governing permissions and

 * limitations under the License.

 */

using ConsoleApplication1.Cluster.Entity;
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using OF.Notify.DataHost.Cluster.Disk;
using OF.Notify.DataHost.Cluster.Entity;
using OF.Notify.Entity;
using OF.Notify.Master;
using OF.DistributeService.Core.Common;
using OF.Notify.Channel;
using System.Web;
using ZooKeeperNet;
using System.Diagnostics;
using OF.Notify.Common;

namespace OF.Notify.DataHost.Cluster
{

    public class DataNode : INotifyService
    {
        public bool isDisposed = false;
        public Int16 Id = ClusterContext.InvalidateId;
        public int sequenceNo;
        public string notifyHostUrl = null;

        public DataNodeConfig dataNodeConfig;
        public ClusterMaster masterNode = null;
        internal ZooKeeperProxy zk = null;
        internal DataNodeClusterWatch dataNodeClusterWatcher = new DataNodeClusterWatch();
        internal List<DataNodeProxy> currentDataNodeList = null;


        public DataNodeProxy masterDataNodeProxy = null;
        public int currentMasterNodeId = -1;

        internal TopicInstanceProvider<TopicDataNode> nodeTopicDataManager = null;


        internal static readonly List<short> EmptyOnLineNodeList = new List<short>();

		public List<DataNodeProxy> GetCurrentDataNodeList()
		{
			return currentDataNodeList;
		}

        internal void InitNodeTopicDataManager()
        {
            var supportedTopicList = ClusterContext.Get().GetAllSupportedTopic();
            Dictionary<byte, TopicDataNode> topicDataDict = new Dictionary<byte, TopicDataNode>(supportedTopicList.Count);
            foreach (var topic in supportedTopicList)
            {
                topicDataDict.Add(topic, new TopicDataNode(this, topic));
            }
            nodeTopicDataManager = new TopicInstanceProvider<TopicDataNode>(topicDataDict);
        }

        public List<short> GetOnlineNodeIdList()
        {
            if (currentDataNodeList == null || currentDataNodeList.Count == 0)
            {
                return EmptyOnLineNodeList;
            }
            else
            {
                return currentDataNodeList.Select(node => node.GetId()).ToList();
            }
        }

        public bool IsSameNode(DataNodeProxy dataNodeProxy)
        {
            return this.Id == dataNodeProxy.GetId();
        }

        public DataNode(DataNodeConfig dataNodeConfig)
        {
            this.dataNodeConfig = dataNodeConfig;
            if (dataNodeConfig.NotifyHostId > (Int16.MaxValue / 3 - 3))
            {
                throw new NotSupportedException("NotifyHostId can't exceed max value:" + (Int16.MaxValue / 3 - 3));
            }
            this.notifyHostUrl = dataNodeConfig.NotifyHostUrl;
            this.Id = dataNodeConfig.NotifyHostId;
            //Util.LogInfo("DataNode " + this.Id + ",hashcode:" + this.GetHashCode());
            ClusterContext.Get().GetNotifyServiceProvider().InitCurrentDataNode(this);            
        }

        public void Start()
        {
            if (isDisposed)
            {
                return;
            }
            InitNodeTopicDataManager();
            if (isDisposed)
            {
                return;
            }
            zk = new ZooKeeperProxy(dataNodeConfig.ZookeeperConnection, new TimeSpan(0, 0, 0, 0, 60000), new ZooKeeperSafeConnectWatcher(), null);
            string topicParent = NotifyConfig.TopicParent;
            string brokerPath = topicParent + "/" + Id + "|" + HttpUtility.UrlEncode(notifyHostUrl) + "|";
            zk.SafeCreatePath(topicParent, ZooKeeperProxy.EmptyBuffer);
            var children = zk.GetChildren(topicParent, false);
            if (children != null)
            {
                List<string> childList = children.Where(child => child.Split('|')[0].Equals(Id.ToString())).ToList();
                foreach (var child in childList)
                {
                    zk.Delete(topicParent + "/" + child, -1);
                }
            }

            if (isDisposed)
            {
                return;
            }
            //Util.LogInfo("create borker:" + brokerPath);
            string nodePath = zk.Create(brokerPath, ZooKeeperProxy.EmptyBuffer, Ids.OPEN_ACL_UNSAFE, CreateMode.EphemeralSequential);
            sequenceNo = int.Parse(nodePath.Split('|')[2]);
            dataNodeClusterWatcher.SetZk(zk, this);
            RefreshDataNodeCluster();

            if (isDisposed)
            {
                return;
            }
			foreach(var topicEnum in ClusterContext.Get().GetAllSupportedTopic())
			{
				nodeTopicDataManager.GetInstance(topicEnum).StartSyncThread();
			}
        }		

        internal void RefreshDataNodeCluster()
        {
            string topicParent = NotifyConfig.TopicParent;
            var children = zk.GetChildren(topicParent, dataNodeClusterWatcher);
            List<string> childList = null;
            if (children == null)
            {
                childList = new List<string>();
            }
            else
            {
                childList = children.ToList();
            }
            currentDataNodeList = childList.Select(child => DataNodeProxy.Parse(child)).ToList();
            if (currentDataNodeList.Count == 0)
            {
                currentMasterNodeId = -1;
                masterDataNodeProxy = null;
                if (this.masterNode != null)
                {
                    this.masterNode.DoDispose();
                    this.masterNode = null;
                    Util.LogInfo("matser node is null:" + this.Id);
                }
                return;
            }
            var minDataNode = currentDataNodeList.OrderBy(node => node.GetSeq()).First();
            int lastMasterNodeId = currentMasterNodeId;
            currentMasterNodeId = minDataNode.GetId();
            if (lastMasterNodeId != currentMasterNodeId)
            {
                masterDataNodeProxy = minDataNode;
            }
            //Util.LogInfo("master node refresh on:" + this.notifyHostId);
            if (currentMasterNodeId == this.Id)
            {
                if (this.masterNode == null)
                {
                    this.masterNode = new ClusterMaster(zk, currentDataNodeList);
                    //Util.LogInfo("matser node is now:" + this.Id);
                }
                else
                {
                    this.masterNode.NotifyClusterChanged(currentDataNodeList);
                }
            }
            else
            {
                if (this.masterNode != null)
                {
                    this.masterNode.DoDispose();
                    this.masterNode = null;
                    Util.LogInfo("matser node is null:" + this.Id);
                }
            }
        }

        public void DisposeIfMaster()
        {
            if (this.masterNode != null)
            {
                this.masterNode.DoDispose();
                this.masterNode = null;
                Util.LogInfo("matser node is null:" + this.Id);
            }
        }

        public class DataNodeClusterWatch : IWatcher
        {
            internal ZooKeeperProxy zk;
            internal DataNode dataNode;
            public void SetZk(ZooKeeperProxy zk, DataNode dataNode)
            {
                this.zk = zk;
                this.dataNode = dataNode;
            }

            public void Process(WatchedEvent @event)
            {
                try
                {
                    if (@event.Path == null)
                    {
                        return;
                    }

                    if (@event.State == KeeperState.Disconnected || @event.State == KeeperState.Expired)
                    {
                        dataNode.DisposeIfMaster();
                    }
                    else
                    {
                        if (@event.Type == EventType.NodeChildrenChanged)
                        {
                            if (dataNode != null)
                            {
                                if (!zk.IsDisposed())
                                {
                                    dataNode.RefreshDataNodeCluster();
                                }
                            }
                        }
                    }
                }
                catch (Exception ex)
                {
                    Util.LogException("DataNodeClusterWatch", ex);
                    if (dataNode.isDisposed)
                    {
                        return;
                    }
                }
            }
        }



        public void DoDispose()
        {
            isDisposed = true;
            if (masterNode != null)
            {
                masterNode.DoDispose();
                masterNode = null;
            }

            if (zk != null)
            {
                //Util.LogInfo("dispose node 1:" + this.Id);
                zk.Dispose();
                zk = null;
            }
            else
            {
                //Util.LogInfo("dispose node 2:" + this.Id);
            }
			
			if(nodeTopicDataManager != null)
			{
				foreach(var topicEnum in ClusterContext.Get().GetAllSupportedTopic())
				{
					nodeTopicDataManager.GetInstance(topicEnum).ResetNodeStatus();
				}
			}
        }

        public bool IsNodeOffLine(short nodeId)
        {
            return !this.GetOnlineNodeIdList().Contains(nodeId);
        }

        public CallServiceResult<bool> DeliverMessage(DeliverMessageRequest request)
        {
            if (this.masterNode != null)
            {
                return this.masterNode.DeliverMessage(request);
            }
            else
            {
                return CallServiceResult<bool>.GetError(1, "current node is not master node!");
            }
        }

        public CallServiceResult<bool> SendMessage(SendMessageRequest request)
        {
            return nodeTopicDataManager.GetInstance(request.topicEnum).SendMessage(request);
        }

        public void Dump(byte topicEnum)
        {
			if(nodeTopicDataManager != null)
			{
				nodeTopicDataManager.GetInstance(topicEnum).Dump();
			}
        }

        public AllNodesRuntimeData GetAllRuntimeData(GetAllRuntimeDataRequest request)
        {
            if (this.masterNode != null)
            {
                return this.masterNode.GetAllRuntimeData(request.topicEnum, request.consumerEnumList);
            }
            else
            {
                return null;
            }
        }

        public CallServiceResult<bool> MockDisposeAllCluster()
        {
            if (this.masterNode == null)
            {
                return CallServiceResult<bool>.GetError(1, "Current node is not master node!");
            }
            else
            {
                this.masterNode.MockDisposeAllCluster();
                return CallServiceResult<bool>.GetSuccess(true);
            }
        }

        public NodeRuntimeData GetRuntimeData(GetRuntimeDataRequest request)
        {
			if(nodeTopicDataManager != null)
			{
				return nodeTopicDataManager.GetInstance(request.topicEnum).GetRuntimeData();
			}
			else
			{
				return null;
			}
        }



        public Task<CallServiceResult<SendGetUnProcessedFinishedCollectionsDTO>> SendGetUnProcessedFinishedCollections(SendGetUnProcessedFinishedCollectionsRequest request)
        {
            return TaskHelper.SafeStartTask<CallServiceResult<SendGetUnProcessedFinishedCollectionsDTO>>(() => {
                return nodeTopicDataManager.GetInstance(request.topicEnum).SendGetUnProcessedFinishedCollections(request);
            });
        }

        public Task<CallServiceResult<List<int>>> SendProcessCollections(SendProcessCollectionsRequest request)
        {
            return TaskHelper.SafeStartTask<CallServiceResult<List<int>>>(() => {
                return nodeTopicDataManager.GetInstance(request.topicEnum).SendProcessCollections(request);
            });
        }

        public Task<CallServiceResult<bool>> SendSaveClusterNodeChangeToNotify(SendSaveClusterNodeChangeToNotifyRequest request)
        {
            return TaskHelper.SafeStartTask<CallServiceResult<bool>>(() => {
                return nodeTopicDataManager.GetInstance(request.topicEnum).SendSaveClusterNodeChangeToNotify(request);
            });
        }

        public Task<CallServiceResult<Dictionary<byte, IdTreeNode>>> GetProcessedTreeForRestore(GetProcessedTreeForRestoreRequest request)
        {
            return TaskHelper.SafeStartTask<CallServiceResult<Dictionary<byte, IdTreeNode>>>(() => {
                return nodeTopicDataManager.GetInstance(request.topicEnum).GetProcessedTreeForRestore(request);
            });
        }

        public Task<CallServiceResult<bool>> SendInstantProcessCollection(SendInstantProcessCollectionRequest request)
        {
            return TaskHelper.SafeStartTask<CallServiceResult<bool>>(() => {
                return nodeTopicDataManager.GetInstance(request.topicEnum).SendInstantProcessCollection(request); 
            });
        }

        public Task<CallServiceResult<bool>> SendDisposeOnlineCollection(SendDisposeOnlineCollectionRequest request)
        {
            return TaskHelper.SafeStartTask<CallServiceResult<bool>>(() => {
                return nodeTopicDataManager.GetInstance(request.topicEnum).SendDisposeOnlineCollection(request);
            });
        }

        public Task<CallServiceResult<bool>> SendInstantCollectionFinished(SendInstantCollectionFinishedRequest request)
        {
            return TaskHelper.SafeStartTask<CallServiceResult<bool>>(() => {
                return nodeTopicDataManager.GetInstance(request.topicEnum).SendInstantCollectionFinished(request);
            });
        }

        public Task<CallServiceResult<bool>> SendGetClusterNodeChangeToNotifyCmd(SendGetClusterNodeChangeToNotifyCmdRequest request)
        {
            return TaskHelper.SafeStartTask<CallServiceResult<bool>>(() => {
                return nodeTopicDataManager.GetInstance(request.topicEnum).SendGetClusterNodeChangeToNotifyCmd(request);
            });
        }

        public Task<CallServiceResult<List<KeyValuePair<int, List<SmallValue16AndFlag>>>>> SendGetClusterNodeChangeToNotifyData(SendGetClusterNodeChangeToNotifyDataRequest request)
        {
            return TaskHelper.SafeStartTask<CallServiceResult<List<KeyValuePair<int, List<SmallValue16AndFlag>>>>>(() => {
                return nodeTopicDataManager.GetInstance(request.topicEnum).SendGetClusterNodeChangeToNotifyData(request);
            });
        }


        public Task<CallServiceResult<List<SmallValue16AndFlag>>> SendUpdateClusterNodes(SendUpdateClusterNodesRequest request)
        {
            return TaskHelper.SafeStartTask<CallServiceResult<List<SmallValue16AndFlag>>>(() => {
                return nodeTopicDataManager.GetInstance(request.topicEnum).SendUpdateClusterNodes(request);
            });
        }

        public Task<CallServiceResult<bool>> SendSyncCollectionData(SendSyncCollectionDataRequest request)
        {
            return TaskHelper.SafeStartTask<CallServiceResult<bool>>(() => {
                return nodeTopicDataManager.GetInstance(request.topicEnum).SendSyncCollectionData(request);
            });
        }

        public Task<CallServiceResult<bool>> SendSyncCollectionCmd(SendSyncCollectionCmdRequest request)
        {
            return TaskHelper.SafeStartTask<CallServiceResult<bool>>(() => {
                return nodeTopicDataManager.GetInstance(request.topicEnum).SendSyncCollectionCmd(request);
            });
        }

        public Task<CallServiceResult<bool>> SendAssignOnLineClusterNodes(SendAssignOnLineClusterNodesRequest request)
        {
            return TaskHelper.SafeStartTask<CallServiceResult<bool>>(() => {
                return nodeTopicDataManager.GetInstance(request.topicEnum).SendAssignOnLineClusterNodes(request);
            });
        }


        public Task<CallServiceResult<bool>> SendAssignOnlineCollection(SendAssignOnlineCollectionRequest request)
        {
            return TaskHelper.SafeStartTask<CallServiceResult<bool>>(() => {
                return nodeTopicDataManager.GetInstance(request.topicEnum).SendAssignOnlineCollection(request);
            });
        }

        public Task<CallServiceResult<bool>> SendSyncProcessedTree(SendSyncProcessedTreeRequest request)
        {
            return TaskHelper.SafeStartTask<CallServiceResult<bool>>(() => {
                return nodeTopicDataManager.GetInstance(request.topicEnum).SendSyncProcessedTree(request);
            });
        }
    }
}
